package org.codehaus.activemq.ra;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnectionConsumer;
import org.codehaus.activemq.ActiveMQSession;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;

public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker
  implements Work
{
  private static final Log log = LogFactory.getLog(ActiveMQPollingEndpointWorker.class);
  private static final int MAX_WORKERS = 10;
  private SynchronizedBoolean started = new SynchronizedBoolean(false);
  private SynchronizedBoolean stopping = new SynchronizedBoolean(false);
  private Latch stopLatch = new Latch();
  private ActiveMQConnectionConsumer consumer;
  private CircularQueue workers;
  static WorkListener debugingWorkListener = new WorkListener() {
    public void workAccepted(WorkEvent event) {
    }

    public void workRejected(WorkEvent event) {
      ActiveMQPollingEndpointWorker.log.warn("Work rejected: " + event, event.getException());
    }

    public void workStarted(WorkEvent event)
    {
    }

    public void workCompleted(WorkEvent event)
    {
    }
  };

  public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key)
    throws ResourceException
  {
    super(adapter, key);
  }

  public void start() throws WorkException, ResourceException {
    ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
    boolean ok = false;
    try {
      this.workers = new CircularQueue(10, this.stopping);
      for (int i = 0; i < this.workers.size(); i++) {
        ActiveMQSession session = (ActiveMQSession)this.adapter.getPhysicalConnection().createSession(this.transacted, 1);
        XAResource xaresource = null;
        if ((session instanceof XASession)) {
          if (!this.transacted)
            throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
          xaresource = ((XASession)session).getXAResource();
        }

        MessageEndpoint endpoint = this.endpointFactory.createEndpoint(xaresource);
        this.workers.returnObject(new InboundEndpointWork(session, endpoint, this.workers));
      }

      Destination dest = null;
      if ("javax.jms.Queue".equals(activationSpec.getDestinationType()))
        dest = new ActiveMQQueue(activationSpec.getDestinationName());
      else if ("javax.jms.Topic".equals(activationSpec.getDestinationType()))
        dest = new ActiveMQTopic(activationSpec.getDestinationName());
      else {
        throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
      }

      if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null)
        this.consumer = ((ActiveMQConnectionConsumer)this.adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest, activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), null, 0));
      else {
        this.consumer = ((ActiveMQConnectionConsumer)this.adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), null, 0));
      }

      ok = true;
      log.debug("Started");

      this.workManager.scheduleWork(this, 9223372036854775807L, null, debugingWorkListener);
      ok = true;
    }
    catch (JMSException e) {
      throw new ResourceException("Could not start the endpoint.", e);
    }
    finally
    {
      if (!ok)
        safeClose(this.consumer);
    }
  }

  private String emptyToNull(String value)
  {
    if ("".equals(value)) {
      return null;
    }
    return value;
  }

  public void stop()
    throws InterruptedException
  {
    this.stopping.set(true);
    this.workers.notifyWaiting();
    if (this.started.compareTo(true) == 0) {
      this.stopLatch.acquire();
    }
    safeClose(this.consumer);
  }

  public void release()
  {
  }

  public void run()
  {
    this.started.set(true);
    try
    {
      while (!this.stopping.get()) {
        ActiveMQMessage message = this.consumer.receive(500L);
        if (message != null) {
          InboundEndpointWork worker = (InboundEndpointWork)this.workers.get();

          if (worker == null) {
            break;
          }
          worker.message = message;
          this.workManager.scheduleWork(worker, 9223372036854775807L, null, debugingWorkListener);
        }

      }

      this.workers.drain();
    }
    catch (Throwable e) {
      log.info("dispatcher: ", e);
    } finally {
      this.stopLatch.release();
    }
  }

  public static class InboundEndpointWork
    implements Work
  {
    private final ActiveMQSession session;
    private final MessageEndpoint endpoint;
    private final CircularQueue workers;
    ActiveMQMessage message;

    public InboundEndpointWork(ActiveMQSession session, MessageEndpoint endpoint, CircularQueue workers)
      throws JMSException
    {
      this.session = session;
      this.endpoint = endpoint;
      this.workers = workers;
      session.setMessageListener((MessageListener)endpoint);
    }

    public void release()
    {
    }

    public void run()
    {
      try
      {
        this.endpoint.beforeDelivery(ActiveMQBaseEndpointWorker.ON_MESSAGE_METHOD);
        try {
          this.session.dispatch(this.message);
          this.session.run();
        } finally {
          this.endpoint.afterDelivery();
        }
      }
      catch (NoSuchMethodException e) {
        ActiveMQPollingEndpointWorker.log.info("worker: ", e);
      } catch (ResourceException e) {
        ActiveMQPollingEndpointWorker.log.info("worker: ", e);
      } finally {
        this.workers.returnObject(this);
      }
    }
  }
}